package com.lagou.spark.part01

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @author: yehw
 * @date: 2020/11/12 18:44
 * @description: 消费者
 */
object KafkaConsumer {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getCanonicalName)
    val ssc = new StreamingContext(conf, Seconds(2))
    // 定义kafka相关参数
    val kafkaParams: Map[String, Object] =
      getKafkaConsumerParameters("groupId")
    val topics: Array[String] = Array("topicB")
    // 从 kafka 中获取数据
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    // DStream输出
    dstream.foreachRDD { (rdd, time) => if (!rdd.isEmpty()) {
      println(s"*********** rdd.count = ${rdd.count()}; time = $time ***********")
    }
    }
    ssc.start()
    ssc.awaitTermination()
  }


  def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092,linux122:9092,linux123:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }
}
